{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Exploring tf.transform # \n",
    "\n",
    "**Learning Objectives**\n",
    "1. Preprocess data and engineer new features using TfTransform. \n",
    "1. Create and deploy Apache Beam pipeline. \n",
    "1. Use processed data to train taxifare model locally then serve a prediction.\n",
    "\n",
    "\n",
    "\n",
    "\n",
    "## Introduction \n",
    "While Pandas is fine for experimenting, for operationalization of your workflow it is better to do preprocessing in Apache Beam. This will also help if you need to preprocess data in flight, since Apache Beam allows for streaming. In this lab we will pull data from BigQuery then use Apache Beam  TfTransform to process the data.  \n",
    "\n",
    "Only specific combinations of TensorFlow/Beam are supported by tf.transform so make sure to get a combo that works. In this lab we will be using: \n",
    "* TFT 0.24.0\n",
    "* TF 2.3.0 \n",
    "* Apache Beam [GCP] 2.24.0\n",
    "\n",
    "Each learning objective will correspond to a __#TODO__ in the notebook where you will complete the notebook cell's code before running. Refer to the [solution notebook](../solutions/5_tftransform_taxifare.ipynb) for reference. \n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install tensorflow==2.3.0 tensorflow-transform==0.24.0 apache-beam[gcp]==2.24.0"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**NOTE**: You may ignore specific incompatibility errors and warnings. These components and issues do not impact your ability to complete the lab.",
    "\n",
    "Download .whl file for tensorflow-transform. We will pass this file to Beam Pipeline Options so it is installed on the DataFlow workers "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install --user google-cloud-bigquery==1.25.0"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip download tensorflow-transform==0.24.0 --no-deps"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<b>Restart the kernel</b> (click on the reload button above)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "pip freeze | grep -e 'flow\\|beam'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "import tensorflow_transform as tft\n",
    "import shutil\n",
    "print(tf.__version__)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# change these to try this notebook out\n",
    "BUCKET = 'bucket-name'\n",
    "PROJECT = 'project-id'\n",
    "REGION = 'us-central1'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "os.environ['BUCKET'] = BUCKET\n",
    "os.environ['PROJECT'] = PROJECT\n",
    "os.environ['REGION'] = REGION"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "gcloud config set project $PROJECT\n",
    "gcloud config set compute/region $REGION"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "if ! gcloud storage ls | grep -q gs://${BUCKET}/; then\n",    "  gcloud storage buckets create --location ${REGION} gs://${BUCKET}\n",    "fi"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Input source: BigQuery\n",
    "\n",
    "Get data from BigQuery but defer the majority of filtering etc. to Beam.\n",
    "Note that the dayofweek column is now strings."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from google.cloud import bigquery\n",
    "\n",
    "\n",
    "def create_query(phase, EVERY_N):\n",
    "    \"\"\"Creates a query with the proper splits.\n",
    "\n",
    "    Args:\n",
    "        phase: int, 1=train, 2=valid.\n",
    "        EVERY_N: int, take an example EVERY_N rows.\n",
    "\n",
    "    Returns:\n",
    "        Query string with the proper splits.\n",
    "    \"\"\"\n",
    "    base_query = \"\"\"\n",
    "    WITH daynames AS\n",
    "    (SELECT ['Sun', 'Mon', 'Tues', 'Wed', 'Thurs', 'Fri', 'Sat'] AS daysofweek)\n",
    "    SELECT\n",
    "    (tolls_amount + fare_amount) AS fare_amount,\n",
    "    daysofweek[ORDINAL(EXTRACT(DAYOFWEEK FROM pickup_datetime))] AS dayofweek,\n",
    "    EXTRACT(HOUR FROM pickup_datetime) AS hourofday,\n",
    "    pickup_longitude AS pickuplon,\n",
    "    pickup_latitude AS pickuplat,\n",
    "    dropoff_longitude AS dropofflon,\n",
    "    dropoff_latitude AS dropofflat,\n",
    "    passenger_count AS passengers,\n",
    "    'notneeded' AS key\n",
    "    FROM\n",
    "    `nyc-tlc.yellow.trips`, daynames\n",
    "    WHERE\n",
    "    trip_distance > 0 AND fare_amount > 0\n",
    "    \"\"\"\n",
    "    if EVERY_N is None:\n",
    "        if phase < 2:\n",
    "            # training\n",
    "            query = \"\"\"{0} AND ABS(MOD(FARM_FINGERPRINT(CAST\n",
    "            (pickup_datetime AS STRING), 4)) < 2\"\"\".format(base_query)\n",
    "        else:\n",
    "            query = \"\"\"{0} AND ABS(MOD(FARM_FINGERPRINT(CAST(\n",
    "            pickup_datetime AS STRING), 4)) = {1}\"\"\".format(base_query, phase)\n",
    "    else:\n",
    "        query = \"\"\"{0} AND ABS(MOD(FARM_FINGERPRINT(CAST(\n",
    "        pickup_datetime AS STRING)), {1})) = {2}\"\"\".format(\n",
    "            base_query, EVERY_N, phase)\n",
    "\n",
    "    return query\n",
    "\n",
    "query = create_query(2, 100000)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_valid = bigquery.Client().query(query).to_dataframe()\n",
    "display(df_valid.head())\n",
    "df_valid.describe()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create ML dataset using tf.transform and Dataflow\n",
    "\n",
    "Let's use Cloud Dataflow to read in the BigQuery data and write it out as TFRecord files. Along the way, let's use tf.transform to do scaling and transforming. Using tf.transform allows us to save the metadata to ensure that the appropriate transformations get carried out during prediction as well."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`transformed_data` is type `pcollection`.\n",
    "\n",
    "5 __TODO's__ in the following cell block \n",
    "1. Convert day of week from string->int with `tft.string_to_int`\n",
    "1. Scale `pickuplat`, `pickuplon`, `dropofflat`, `dropofflon` between 0 and 1 with `tft.scale_to_0_1`\n",
    "1. Scale our engineered features `latdiff` and `londiff` between 0 and 1\n",
    "1. Analyze and transform our training data using `beam_impl.AnalyzeAndTransformDataset()`\n",
    "1. Read eval data from BigQuery using `beam.io.BigQuerySource` and filter rows using our `is_valid` function"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import datetime\n",
    "import tensorflow as tf\n",
    "import apache_beam as beam\n",
    "import tensorflow_transform as tft\n",
    "import tensorflow_metadata as tfmd\n",
    "from tensorflow_transform.beam import impl as beam_impl\n",
    "\n",
    "\n",
    "def is_valid(inputs):\n",
    "    \"\"\"Check to make sure the inputs are valid.\n",
    "\n",
    "    Args:\n",
    "        inputs: dict, dictionary of TableRow data from BigQuery.\n",
    "\n",
    "    Returns:\n",
    "        True if the inputs are valid and False if they are not.\n",
    "    \"\"\"\n",
    "    try:\n",
    "        pickup_longitude = inputs['pickuplon']\n",
    "        dropoff_longitude = inputs['dropofflon']\n",
    "        pickup_latitude = inputs['pickuplat']\n",
    "        dropoff_latitude = inputs['dropofflat']\n",
    "        hourofday = inputs['hourofday']\n",
    "        dayofweek = inputs['dayofweek']\n",
    "        passenger_count = inputs['passengers']\n",
    "        fare_amount = inputs['fare_amount']\n",
    "        return fare_amount >= 2.5 and pickup_longitude > -78 \\\n",
    "            and pickup_longitude < -70 and dropoff_longitude > -78 \\\n",
    "            and dropoff_longitude < -70 and pickup_latitude > 37 \\\n",
    "            and pickup_latitude < 45 and dropoff_latitude > 37 \\\n",
    "            and dropoff_latitude < 45 and passenger_count > 0\n",
    "    except:\n",
    "        return False\n",
    "\n",
    "\n",
    "def preprocess_tft(inputs):\n",
    "    \"\"\"Preprocess the features and add engineered features with tf transform.\n",
    "\n",
    "    Args:\n",
    "        dict, dictionary of TableRow data from BigQuery.\n",
    "\n",
    "    Returns:\n",
    "        Dictionary of preprocessed data after scaling and feature engineering.\n",
    "    \"\"\"\n",
    "    import datetime\n",
    "    print(inputs)\n",
    "    result = {}\n",
    "    result['fare_amount'] = tf.identity(inputs['fare_amount'])\n",
    "    # Build a vocabulary\n",
    "\n",
    "    # TODO 1: convert day of week from string->int with tft.string_to_int\n",
    "\n",
    "    result['hourofday'] = tf.identity(inputs['hourofday'])  # pass through\n",
    "\n",
    "    # TODO 2: scale pickup/dropoff lat/lon between 0 and 1 with tft.scale_to_0_1\n",
    "\n",
    "    result['passengers'] = tf.cast(inputs['passengers'], tf.float32)  # a cast\n",
    "    # Arbitrary TF func\n",
    "    result['key'] = tf.as_string(tf.ones_like(inputs['passengers']))\n",
    "\n",
    "    # Engineered features\n",
    "    latdiff = inputs['pickuplat'] - inputs['dropofflat']\n",
    "    londiff = inputs['pickuplon'] - inputs['dropofflon']\n",
    "\n",
    "    # TODO 3: Scale our engineered features latdiff and londiff between 0 and 1\n",
    "\n",
    "    dist = tf.sqrt(latdiff * latdiff + londiff * londiff)\n",
    "    result['euclidean'] = tft.scale_to_0_1(dist)\n",
    "    return result\n",
    "\n",
    "\n",
    "def preprocess(in_test_mode):\n",
    "    \"\"\"Sets up preprocess pipeline.\n",
    "\n",
    "    Args:\n",
    "        in_test_mode: bool, False to launch DataFlow job, True to run locally.\n",
    "    \"\"\"\n",
    "    import os\n",
    "    import os.path\n",
    "    import tempfile\n",
    "    from apache_beam.io import tfrecordio\n",
    "    from tensorflow_transform.coders import example_proto_coder\n",
    "    from tensorflow_transform.tf_metadata import dataset_metadata\n",
    "    from tensorflow_transform.tf_metadata import dataset_schema\n",
    "    from tensorflow_transform.beam import tft_beam_io\n",
    "    from tensorflow_transform.beam.tft_beam_io import transform_fn_io\n",
    "\n",
    "    job_name = 'preprocess-taxi-features' + '-'\n",
    "    job_name += datetime.datetime.now().strftime('%y%m%d-%H%M%S')\n",
    "    if in_test_mode:\n",
    "        import shutil\n",
    "        print('Launching local job ... hang on')\n",
    "        OUTPUT_DIR = './preproc_tft'\n",
    "        shutil.rmtree(OUTPUT_DIR, ignore_errors=True)\n",
    "        EVERY_N = 100000\n",
    "    else:\n",
    "        print('Launching Dataflow job {} ... hang on'.format(job_name))\n",
    "        OUTPUT_DIR = 'gs://{0}/taxifare/preproc_tft/'.format(BUCKET)\n",
    "        import subprocess\n",
    "        subprocess.call('gcloud storage rm --recursive {}'.format(OUTPUT_DIR).split())\n",    "        EVERY_N = 10000\n",
    "\n",
    "    options = {\n",
    "        'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),\n",
    "        'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),\n",
    "        'job_name': job_name,\n",
    "        'project': PROJECT,\n",
    "        'num_workers': 1,\n",
    "        'max_num_workers': 1,\n",
    "        'teardown_policy': 'TEARDOWN_ALWAYS',\n",
    "        'no_save_main_session': True,\n",
    "        'direct_num_workers': 1,\n",
    "        'extra_packages': ['tensorflow_transform-0.24.0-py3-none-any.whl']\n",
    "        }\n",
    "\n",
    "    opts = beam.pipeline.PipelineOptions(flags=[], **options)\n",
    "    if in_test_mode:\n",
    "        RUNNER = 'DirectRunner'\n",
    "    else:\n",
    "        RUNNER = 'DataflowRunner'\n",
    "\n",
    "    # Set up raw data metadata\n",
    "    raw_data_schema = {\n",
    "        colname: dataset_schema.ColumnSchema(\n",
    "            tf.string, [], dataset_schema.FixedColumnRepresentation())\n",
    "        for colname in 'dayofweek,key'.split(',')\n",
    "    }\n",
    "\n",
    "    raw_data_schema.update({\n",
    "        colname: dataset_schema.ColumnSchema(\n",
    "            tf.float32, [], dataset_schema.FixedColumnRepresentation())\n",
    "        for colname in\n",
    "        'fare_amount,pickuplon,pickuplat,dropofflon,dropofflat'.split(',')\n",
    "    })\n",
    "\n",
    "    raw_data_schema.update({\n",
    "        colname: dataset_schema.ColumnSchema(\n",
    "            tf.int64, [], dataset_schema.FixedColumnRepresentation())\n",
    "        for colname in 'hourofday,passengers'.split(',')\n",
    "    })\n",
    "\n",
    "    raw_data_metadata = dataset_metadata.DatasetMetadata(\n",
    "        dataset_schema.Schema(raw_data_schema))\n",
    "\n",
    "    # Run Beam\n",
    "    with beam.Pipeline(RUNNER, options=opts) as p:\n",
    "        with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')):\n",
    "            # Save the raw data metadata\n",
    "            (raw_data_metadata |\n",
    "                'WriteInputMetadata' >> tft_beam_io.WriteMetadata(\n",
    "                    os.path.join(\n",
    "                        OUTPUT_DIR, 'metadata/rawdata_metadata'), pipeline=p))\n",
    "\n",
    "            # TODO 4: Analyze and transform our training data\n",
    "            # using beam_impl.AnalyzeAndTransformDataset()\n",
    "\n",
    "            raw_dataset = (raw_data, raw_data_metadata)\n",
    "\n",
    "            # Analyze and transform training data\n",
    "            transformed_dataset, transform_fn = (\n",
    "                raw_dataset | beam_impl.AnalyzeAndTransformDataset(\n",
    "                    preprocess_tft))\n",
    "            transformed_data, transformed_metadata = transformed_dataset\n",
    "\n",
    "            # Save transformed train data to disk in efficient tfrecord format\n",
    "            transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(\n",
    "                os.path.join(OUTPUT_DIR, 'train'), file_name_suffix='.gz',\n",
    "                coder=example_proto_coder.ExampleProtoCoder(\n",
    "                    transformed_metadata.schema))\n",
    "\n",
    "            # TODO 5: Read eval data from BigQuery using beam.io.BigQuerySource\n",
    "            # and filter rows using our is_valid function\n",
    "\n",
    "            raw_test_dataset = (raw_test_data, raw_data_metadata)\n",
    "\n",
    "            # Transform eval data\n",
    "            transformed_test_dataset = (\n",
    "                (raw_test_dataset, transform_fn) | beam_impl.TransformDataset()\n",
    "                )\n",
    "            transformed_test_data, _ = transformed_test_dataset\n",
    "\n",
    "            # Save transformed train data to disk in efficient tfrecord format\n",
    "            (transformed_test_data |\n",
    "                'WriteTestData' >> tfrecordio.WriteToTFRecord(\n",
    "                    os.path.join(OUTPUT_DIR, 'eval'), file_name_suffix='.gz',\n",
    "                    coder=example_proto_coder.ExampleProtoCoder(\n",
    "                        transformed_metadata.schema)))\n",
    "\n",
    "            # Save transformation function to disk for use at serving time\n",
    "            (transform_fn |\n",
    "                'WriteTransformFn' >> transform_fn_io.WriteTransformFn(\n",
    "                    os.path.join(OUTPUT_DIR, 'metadata')))\n",
    "\n",
    "# Change to True to run locally\n",
    "preprocess(in_test_mode=False)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This will take __10-15 minutes__. You cannot go on in this lab until your DataFlow job has successfully completed. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Note**: The above command may fail with an error **`Workflow failed. Causes: There was a problem refreshing your credentials`**. In that case, `re-run` the command again."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's check to make sure that there is data where we expect it to be now."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "# ls preproc_tft\n",
    "gcloud storage ls gs://${BUCKET}/taxifare/preproc_tft/"   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train off preprocessed data ##\n",
    "Now that we have our data ready and verified it is in the correct location we can train our taxifare model locally."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "rm -r ./taxi_trained\n",
    "export PYTHONPATH=${PYTHONPATH}:$PWD\n",
    "python3 -m tft_trainer.task \\\n",
    "    --train_data_path=\"gs://${BUCKET}/taxifare/preproc_tft/train*\" \\\n",
    "    --eval_data_path=\"gs://${BUCKET}/taxifare/preproc_tft/eval*\"  \\\n",
    "    --output_dir=./taxi_trained \\"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!ls $PWD/taxi_trained/export/exporter"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's create fake data in JSON format and use it to serve a prediction with gcloud ai-platform local predict"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile /tmp/test.json\n",
    "{\"dayofweek\":0, \"hourofday\":17, \"pickuplon\": -73.885262, \"pickuplat\": 40.773008, \"dropofflon\": -73.987232, \"dropofflat\": 40.732403, \"passengers\": 2.0}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "sudo find \"/usr/lib/google-cloud-sdk/lib/googlecloudsdk/command_lib/ml_engine\" -name '*.pyc' -delete"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "model_dir=$(ls $PWD/taxi_trained/export/exporter/)\n",
    "gcloud ai-platform local predict \\\n",
    "    --model-dir=./taxi_trained/export/exporter/${model_dir} \\\n",
    "    --json-instances=/tmp/test.json"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copyright 2022 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
